package cn.chongho.inf.flink.job;

import org.apache.commons.lang.StringUtils;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * sql temp job.
 * @author ming
 */
public class FlinkSqlTemplateJob {

    public static void main(String[] args) {
        ParameterTool parameterTool = ParameterTool.fromArgs(args);

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // check point config.
        env.enableCheckpointing(60 * 1000L);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);


        env.setStateBackend(new HashMapStateBackend());
        env.getCheckpointConfig().setCheckpointStorage(parameterTool.get("checkpoint.path", "file:///tmp/flink/checkpoints"));


        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
        String tableColumn =  parameterTool.get("tableColumn");


        //运行sql
        String jobSql = parameterTool.get("jobSql");

        //String jobName =  parameterTool.get("jobName");
        if(StringUtils.isBlank(jobSql)){
            return;
        }

        for(String sql: jobSql.split(";")){
            tableEnv.executeSql(sql);
        }
    }
}
